// 文件监听通知
// 不支持递归目录
// 不支持 NFS SMB 等虚拟文件系统

package fsnotify

import (
	"context"
	"errors"
	"fmt"
	"gitee.com/zackeus/go-zero/core/threading"
	"gitee.com/zackeus/goutil/arrutil"
	"gitee.com/zackeus/goutil/fsutil"
	"github.com/fsnotify/fsnotify"
	"github.com/google/uuid"
	"github.com/panjf2000/ants/v2"
	"path/filepath"
	"strings"
	"sync"
	"sync/atomic"
	"time"
)

const (
	// 隐藏文件前缀
	hiddenFilePrefix = "."

	Create = Op(fsnotify.Create)
	Write  = Op(fsnotify.Write)
	Remove = Op(fsnotify.Remove)
	Rename = Op(fsnotify.Rename)
	Chmod  = Op(fsnotify.Chmod)
)

type (
	Op    fsnotify.Op
	Event fsnotify.Event
)

type (
	errFn   func(err error)
	eventFn func(event Event)

	eventListener struct {
		fn  eventFn // 事件回调函数
		ops []Op    // 过滤的操作
	}
	config struct {
		ignoreHiddenFile    bool   // 忽略隐藏文件
		eventConcurrentSize uint64 // 事件并发大小
		errListener         errFn  // 异常回调
	}

	// Options 可选项
	Options      func(cfg *config)
	WatchOptions func(e *eventListener)

	Watcher struct {
		mu               sync.RWMutex
		once             sync.Once
		closed           atomic.Bool
		ctx              context.Context
		stopFunc         func()
		core             *fsnotify.Watcher
		eventPool        *ants.Pool
		ignoreHiddenFile bool                                 // 忽略隐藏文件
		eventListeners   map[string]map[string]*eventListener // 事件回调
		errListener      errFn                                // 异常回调
	}
)

func New(opts ...Options) (*Watcher, error) {
	w, err := fsnotify.NewWatcher()
	if err != nil {
		return nil, err
	}

	cfg := &config{
		ignoreHiddenFile:    true,
		eventConcurrentSize: 100,
		errListener:         nil,
	}
	for _, opt := range opts {
		opt(cfg)
	}

	/* 构建阻塞的 goroutine pool */
	pool, err := ants.NewPool(int(cfg.eventConcurrentSize),
		ants.WithNonblocking(false),
		ants.WithPreAlloc(false),
		/* 清理周期 */
		ants.WithExpiryDuration(1*time.Hour),
		/* 定期清理携程 */
		ants.WithDisablePurge(false),
	)
	if err != nil {
		return nil, err
	}

	/* 构建上下文 */
	ctx, stop := context.WithCancel(context.Background())
	watcher := &Watcher{
		ctx:              ctx,
		stopFunc:         stop,
		core:             w,
		eventPool:        pool,
		ignoreHiddenFile: cfg.ignoreHiddenFile,
		eventListeners:   make(map[string]map[string]*eventListener, 1),
		errListener:      cfg.errListener,
	}
	watcher.closed.Store(false)

	threading.GoSafe(watcher.loop)
	return watcher, nil
}

func (w *Watcher) loop() {
	for {
		select {
		case event, ok := <-w.core.Events:
			if !ok {
				w.Shutdown()
				return
			}

			path, name := filepath.Split(event.Name)
			if w.ignoreHiddenFile && strings.HasPrefix(name, hiddenFilePrefix) {
				/* 忽略隐藏文件 */
				break
			}
			path = filepath.Clean(path)

			w.mu.RLock()
			/* 根据路径匹配回调函数 */
			m, ok := w.eventListeners[path]
			if !ok {
				break
			}

			for _, listener := range m {
				for _, op := range listener.ops {
					if event.Has(fsnotify.Op(op)) {
						_ = w.eventPool.Submit(func() {
							listener.fn(Event(event))
						})
					}
				}
			}
			w.mu.RUnlock()
		case err, ok := <-w.core.Errors:
			if !ok {
				w.Shutdown()
				return
			}
			w.onErr(err)
		case <-w.ctx.Done():
			return
		}
	}
}

func (w *Watcher) onErr(err error) {
	if w.errListener != nil {
		_ = w.eventPool.Submit(func() {
			w.errListener(err)
		})
	}
}

func (w *Watcher) Available() bool {
	return !w.closed.Load()
}

// Watch 监听指定目录
func (w *Watcher) Watch(path string, fn eventFn, opts ...WatchOptions) (string, error) {
	w.mu.Lock()
	defer w.mu.Unlock()

	if w.closed.Load() {
		return "", errors.New("the file notify watcher has closed")
	}

	if !fsutil.IsDir(path) {
		/* 路径为非目录 */
		return "", errors.New(fmt.Sprintf("the path:[%s] not a dir", path))
	}
	/* 格式化路径 */
	path = filepath.Clean(path)
	if err := w.core.Add(path); err != nil {
		return "", err
	}

	id := uuid.New().String()
	listener := &eventListener{
		fn:  fn,
		ops: []Op{Create, Write, Remove, Rename},
	}
	for _, opt := range opts {
		opt(listener)
	}

	if _, ok := w.eventListeners[path]; ok {
		w.eventListeners[path][id] = listener
	} else {
		w.eventListeners[path] = map[string]*eventListener{id: listener}
	}

	return id, nil
}

// UnWatch 取消监听
func (w *Watcher) UnWatch(path string, ids ...string) error {
	w.mu.Lock()
	defer w.mu.Unlock()

	if w.closed.Load() {
		return errors.New("the file notify watcher has closed")
	}

	/* 格式化路径 */
	path = filepath.Clean(path)
	if err := w.core.Remove(path); err != nil {
		return err
	}

	if ids == nil {
		delete(w.eventListeners, path)
		return nil
	}

	if listeners, ok := w.eventListeners[path]; ok {
		for _, id := range ids {
			delete(listeners, id)
		}
	}
	return nil
}

func (w *Watcher) Shutdown() {
	w.once.Do(func() {
		/* 先标识 */
		w.closed.Store(true)
		w.stopFunc()

		if w.core != nil {
			_ = w.core.Close()
		}
	})
}

// WithEventConcurrentSize 事件并发大小
func WithEventConcurrentSize(size uint64) Options {
	return func(cfg *config) {
		cfg.eventConcurrentSize = size
	}
}

// WithIgnoreHiddenFile 忽略隐藏文件
func WithIgnoreHiddenFile(b bool) Options {
	return func(cfg *config) {
		cfg.ignoreHiddenFile = b
	}
}

// WithOnErr 异常回调
func WithOnErr(fn errFn) Options {
	return func(cfg *config) {
		cfg.errListener = fn
	}
}

// WithWatchOps 监听操作过滤
func WithWatchOps(ops []Op) WatchOptions {
	return func(e *eventListener) {
		if len(ops) > 1 {
			/* 去重 */
			ops = arrutil.Unique(ops)
		}

		e.ops = make([]Op, 0, len(ops))
		e.ops = append(e.ops, ops...)
	}
}
